// Copyright 2018 The Cockroach Authors.
// Copyright (c) 2022-present, Shanghai Yunxi Technology Co, Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This software (KWDB) is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//          http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

package xform

import (
	"math/rand"
	"reflect"

	"gitee.com/kwbasedb/kwbase/pkg/keys"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/cat"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/memo"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/norm"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/ordering"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/props/physical"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgcode"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgerror"
	"gitee.com/kwbasedb/kwbase/pkg/sql/sem/tree"
	"gitee.com/kwbasedb/kwbase/pkg/util"
	"gitee.com/kwbasedb/kwbase/pkg/util/errorutil"
	"gitee.com/kwbasedb/kwbase/pkg/util/log"
	"github.com/cockroachdb/errors"
)

// MatchedRuleFunc defines the callback function for the NotifyOnMatchedRule
// event supported by the optimizer. See the comment in factory.go for more
// details.
type MatchedRuleFunc = norm.MatchedRuleFunc

// AppliedRuleFunc defines the callback function for the NotifyOnAppliedRule
// event supported by the optimizer. See the comment in factory.go for more
// details.
type AppliedRuleFunc = norm.AppliedRuleFunc

// RuleSet efficiently stores an unordered set of RuleNames.
type RuleSet = util.FastIntSet

// Optimizer transforms an input expression tree into the logically equivalent
// output expression tree with the lowest possible execution cost.
//
// To use the optimizer, construct an input expression tree by invoking
// construction methods on the Optimizer.Factory instance. The factory
// transforms the input expression into its canonical form as part of
// construction. Pass the root of the tree constructed by the factory to the
// Optimize method, along with a set of required physical properties that the
// expression must provide. The optimizer will return an Expr over the output
// expression tree with the lowest cost.
type Optimizer struct {
	evalCtx *tree.EvalContext

	// f is the factory that creates the normalized expressions during the first
	// optimization phase.
	f norm.Factory

	// catalog is the Catalog object that's used to resolve SQL names.
	catalog cat.Catalog

	// mem is the Memo data structure containing the forest of query plans that
	// are generated by the optimizer.
	mem *memo.Memo

	// explorer generates alternate, equivalent expressions and stores them in
	// the memo.
	explorer explorer

	// defaultCoster implements the default cost model. If SetCoster is not
	// called, this coster will be used.
	defaultCoster coster

	// coster is set by default to reference defaultCoster, but can be overridden
	// by calling SetCoster.
	coster Coster

	// stateMap allocates temporary storage that's used to speed up optimization.
	// This state could be discarded once optimization is complete.
	stateMap   map[groupStateKey]*groupState
	stateAlloc groupStateAlloc

	// matchedRule is the callback function that is invoked each time an
	// optimization rule (Normalize or Explore) has been matched by the optimizer.
	// It can be set via a call to the NotifyOnMatchedRule method.
	matchedRule MatchedRuleFunc

	// appliedRule is the callback function which is invoked each time an
	// optimization rule (Normalize or Explore) has been applied by the optimizer.
	// It can be set via a call to the NotifyOnAppliedRule method.
	appliedRule AppliedRuleFunc

	// disabledRules is a set of rules that are not allowed to run, used for
	// testing.
	disabledRules RuleSet

	// JoinOrderBuilder adds new join orderings to the memo.
	jb *JoinOrderBuilder
}

// Init initializes the Optimizer with a new, blank memo structure inside. This
// must be called before the optimizer can be used (or reused).
func (o *Optimizer) Init(evalCtx *tree.EvalContext, catalog cat.Catalog) {
	o.evalCtx = evalCtx
	o.catalog = catalog
	o.f.Init(evalCtx, catalog)
	o.mem = o.f.Memo()
	o.explorer.init(o)
	o.defaultCoster.Init(evalCtx, o.mem, evalCtx.TestingKnobs.OptimizerCostPerturbation)
	o.coster = &o.defaultCoster
	o.stateMap = make(map[groupStateKey]*groupState)
	o.matchedRule = nil
	o.appliedRule = nil
	o.jb = &JoinOrderBuilder{}
	if evalCtx.TestingKnobs.DisableOptimizerRuleProbability > 0 {
		o.disableRules(evalCtx.TestingKnobs.DisableOptimizerRuleProbability)
	}
}

// DetachMemo extracts the memo from the optimizer, and then re-initializes the
// optimizer so that its reuse will not impact the detached memo. This method is
// used to extract a read-only memo during the PREPARE phase.
func (o *Optimizer) DetachMemo() *memo.Memo {
	detach := o.f.DetachMemo()
	o.Init(o.evalCtx, o.catalog)
	return detach
}

// Factory returns a factory interface that the caller uses to construct an
// input expression tree. The root of the resulting tree can be passed to the
// Optimize method in order to find the lowest cost plan.
func (o *Optimizer) Factory() *norm.Factory {
	return &o.f
}

// Coster returns the coster instance that the optimizer is currently using to
// estimate the cost of executing portions of the expression tree. When a new
// optimizer is constructed, it creates a default coster that will be used
// unless it is overridden with a call to SetCoster.
func (o *Optimizer) Coster() Coster {
	return o.coster
}

// SetCoster overrides the default coster. The optimizer will now use the given
// coster to estimate the cost of expression execution.
func (o *Optimizer) SetCoster(coster Coster) {
	o.coster = coster
}

// DisableOptimizations disables all transformation rules, including normalize
// and explore rules. The unaltered input expression tree becomes the output
// expression tree (because no transforms are applied).
func (o *Optimizer) DisableOptimizations() {
	o.NotifyOnMatchedRule(func(opt.RuleName) bool { return false })
}

// NotifyOnMatchedRule sets a callback function which is invoked each time an
// optimization rule (Normalize or Explore) has been matched by the optimizer.
// If matchedRule is nil, then no notifications are sent, and all rules are
// applied by default. In addition, callers can invoke the DisableOptimizations
// convenience method to disable all rules.
func (o *Optimizer) NotifyOnMatchedRule(matchedRule MatchedRuleFunc) {
	o.matchedRule = matchedRule

	// Also pass through the call to the factory so that normalization rules
	// make same callback.
	o.f.NotifyOnMatchedRule(matchedRule)
}

// NotifyOnAppliedRule sets a callback function which is invoked each time an
// optimization rule (Normalize or Explore) has been applied by the optimizer.
// If appliedRule is nil, then no further notifications are sent.
func (o *Optimizer) NotifyOnAppliedRule(appliedRule AppliedRuleFunc) {
	o.appliedRule = appliedRule

	// Also pass through the call to the factory so that normalization rules
	// make same callback.
	o.f.NotifyOnAppliedRule(appliedRule)
}

// Memo returns the memo structure that the optimizer is using to optimize.
func (o *Optimizer) Memo() *memo.Memo {
	return o.mem
}

// Optimize returns the expression which satisfies the required physical
// properties at the lowest possible execution cost, but is still logically
// equivalent to the given expression. If there is a cost "tie", then any one
// of the qualifying lowest cost expressions may be selected by the optimizer.
func (o *Optimizer) Optimize() (_ opt.Expr, err error) {
	defer func() {
		if r := recover(); r != nil {
			// This code allows us to propagate internal errors without having to add
			// error checks everywhere throughout the code. This is only possible
			// because the code does not update shared state and does not manipulate
			// locks.
			if ok, e := errorutil.ShouldCatch(r); ok {
				err = e
			} else {
				// Other panic objects can't be considered "safe" and thus are
				// propagated as crashes that terminate the session.
				panic(r)
			}
		}
	}()

	if o.mem.IsOptimized() {
		return nil, errors.AssertionFailedf("cannot optimize a memo multiple times")
	}

	// Optimize the root expression according to the properties required of it.
	o.optimizeRootWithProps()

	// Now optimize the entire expression tree.
	root := o.mem.RootExpr().(memo.RelExpr)
	exprMap := make(map[interface{}]struct{}, 0)
	memo.RebuildExprs(root, exprMap)

	rootProps := o.mem.RootProps()
	o.optimizeGroup(root, rootProps)

	// Walk the tree from the root, updating child pointers so that the memo
	// root points to the lowest cost tree by default (rather than the normalized
	// tree by default.
	root = o.setLowestCostTree(root, rootProps).(memo.RelExpr)
	if o.mem.TSSupportAllProcessor() {
		err = o.mem.CheckWhiteListAndAddSynchronize(&root)
		if err != nil {
			return root, err
		}
	} else {
		o.mem.DealTSScan(root.(memo.RelExpr))
		o.mem.AddOrderWithGroupWindow(root.(memo.RelExpr))
	}

	o.mem.SetRoot(root, rootProps)

	// Validate there are no dangling references.
	if !root.Relational().OuterCols.Empty() {
		return nil, errors.AssertionFailedf(
			"top-level relational expression cannot have outer columns: %s",
			errors.Safe(root.Relational().OuterCols),
		)
	}

	return root, nil
}

// optimizeExpr calls either optimizeGroup or optimizeScalarExpr depending on
// the type of the expression (relational or scalar).
func (o *Optimizer) optimizeExpr(
	e opt.Expr, required *physical.Required,
) (cost memo.Cost, fullyOptimized bool) {
	switch t := e.(type) {
	case memo.RelExpr:
		state := o.optimizeGroup(t, required)
		return state.cost, state.fullyOptimized

	case memo.ScalarPropsExpr:
		// Short-circuit traversal of scalar expressions with no nested subquery,
		// since there's only one possible tree.
		if !t.ScalarProps().HasSubquery {
			return 0, true
		}
		return o.optimizeScalarExpr(t)

	case opt.ScalarExpr:
		return o.optimizeScalarExpr(t)

	default:
		panic(errors.AssertionFailedf("unhandled child: %+v", e))
	}
}

// optimizeGroup enumerates expression trees rooted in the given memo group and
// finds the expression tree with the lowest cost (i.e. the "best") that
// provides the given required physical properties. Enforcers are added as
// needed to provide the required properties.
//
// The following is a simplified walkthrough of how the optimizer might handle
// the following SQL query:
//
//	SELECT * FROM a WHERE x=1 ORDER BY y
//
// Before the optimizer is invoked, the memo group contains a single normalized
// expression:
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Optimization begins at the root of the memo (group #1), and calls
// optimizeGroup with the properties required of that group ("ordering:y").
// optimizeGroup then calls optimizeGroupMember for the Select expression, which
// checks whether the expression can provide the required properties. Since
// Select is a pass-through operator, it can provide the properties by passing
// through the requirement to its input. Accordingly, optimizeGroupMember
// recursively invokes optimizeGroup on select's input child (group #2), with
// the same set of required properties.
//
// Now the same set of steps are applied to group #2. However, the Scan
// expression cannot provide the required ordering (say because it's ordered on
// x rather than y). The optimizer must add a Sort enforcer. It does this by
// recursively invoking optimizeGroup on the same group #2, but this time
// without the ordering requirement. The Scan operator is capable of meeting
// these reduced requirements, so it is costed and added as the current lowest
// cost expression for that group for that set of properties (i.e. the empty
// set).
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// The recursion pops up a level, and now the Sort enforcer knows its input,
// and so it too can be costed (cost of input + extra cost of sort) and added
// as the best expression for the property set with the ordering requirement.
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Recursion pops up another level, and the Select operator now knows its input
// (the Sort of the Scan). It then moves on to its scalar filter child and
// optimizes it and its descendants, which is relatively uninteresting since
// there are no subqueries to consider (in fact, the optimizer recognizes this
// and skips traversal altogether). Once all children have been optimized, the
// Select operator can now be costed and added as the best expression for the
// ordering requirement. It requires the same ordering requirement from its
// input child (i.e. the scan).
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    └── [ordering: y]
//	 │         ├── best: (select G2="ordering: y" G3)
//	 │         └── cost: 160.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// But the process is not yet complete. After traversing the Select child
// groups, optimizeExpr generates an alternate plan that satisfies the ordering
// property by using a top-level enforcer. It does this by recursively invoking
// optimizeGroup for group #1, but without the ordering requirement, analogous
// to what it did for group #2. This triggers optimization for each child group
// of the Select operator. But this time, the memo already has fully-costed
// best expressions available for both the Input and Filter children, and so
// returns them immediately with no extra work. The Select expression is now
// costed and added as the best expression without an ordering requirement.
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (select G2="ordering: y" G3)
//	 │    │    └── cost: 160.00
//	 │    └── []
//	 │         ├── best: (select G2 G3)
//	 │         └── cost: 110.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Finally, the Sort enforcer for group #1 has its input and can be costed. But
// rather than costing 50.0 like the other Sort enforcer, this one only costs
// 1.0, because it's sorting a tiny set of filtered rows. That means its total
// cost is only 111.0, which makes it the new best expression for group #1 with
// an ordering requirement:
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G1)
//	 │    │    └── cost: 111.00
//	 │    └── []
//	 │         ├── best: (select G2 G3)
//	 │         └── cost: 110.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Now the memo has been fully optimized, and the best expression for group #1
// and "ordering:y" can be set as the root of the tree by setLowestCostTree:
//
//	sort
//	 ├── columns: x:1(int) y:2(int)
//	 ├── ordering: +2
//	 └── select
//	      ├── columns: x:1(int) y:2(int)
//	      ├── scan
//	      │    └── columns: x:1(int) y:2(int)
//	      └── eq [type=bool]
//	           ├── variable: a.x [type=int]
//	           └── const: 1 [type=int]
func (o *Optimizer) optimizeGroup(grp memo.RelExpr, required *physical.Required) *groupState {
	newRequired := o.buildSortRequired(grp, required)
	// Always start with the first expression in the group.
	grp = grp.FirstExpr()

	// If this group is already fully optimized, then return the already prepared
	// best expression (won't ever get better than this).
	state := o.ensureOptState(grp, newRequired)
	if state.fullyOptimized {
		return state
	}

	// Iterate until the group has been fully optimized.
	for {
		fullyOptimized := true

		for i, member := 0, grp; member != nil; i, member = i+1, member.NextExpr() {
			// If this group member has already been fully optimized for the given
			// required properties, then skip it, since it won't get better.
			if state.isMemberFullyOptimized(i) {
				continue
			}

			// Optimize the group member with respect to the required properties.
			memberOptimized := o.optimizeGroupMember(state, member, newRequired)

			// If any of the group members have not yet been fully optimized, then
			// the group is not yet fully optimized.
			if memberOptimized {
				state.markMemberAsFullyOptimized(i)
			} else {
				fullyOptimized = false
			}
		}

		// Now try to generate new expressions that are logically equivalent to
		// other expressions in this group.
		if o.shouldExplore(newRequired) && !o.explorer.exploreGroup(grp).fullyExplored {
			fullyOptimized = false
		}

		if fullyOptimized {
			state.fullyOptimized = true
			break
		}
	}

	if o.evalCtx.HintStmtEmbed || o.evalCtx.HintID > 0 {
		o.checkHintApply(state)
	}

	return state
}

// optimizeGroupMember determines whether the group member expression can
// provide the required properties. If so, it recursively optimizes the
// expression's child groups and computes the cost of the expression. In
// addition, optimizeGroupMember calls enforceProps to check whether enforcers
// can provide the required properties at a lower cost. The lowest cost
// expression is saved to groupState.
func (o *Optimizer) optimizeGroupMember(
	state *groupState, member memo.RelExpr, required *physical.Required,
) (fullyOptimized bool) {
	// Compute the cost for enforcers to provide the required properties. This
	// may be lower than the expression providing the properties itself. For
	// example, it might be better to sort the results of a hash join than to
	// use the results of a merge join that are already sorted, but at the cost
	// of requiring one of the merge join children to be sorted.
	fullyOptimized = o.enforceProps(state, member, required)

	// If the expression cannot provide the required properties, then don't
	// continue. But what if the expression is able to provide a subset of the
	// properties? That case is taken care of by enforceProps, which will
	// recursively optimize the group with property subsets and then add
	// enforcers to provide the remainder.
	if CanProvidePhysicalProps(member, required) {
		var cost memo.Cost
		for i, n := 0, member.ChildCount(); i < n; i++ {
			// Given required parent properties, get the properties required from
			// the nth child.
			childRequired := BuildChildPhysicalProps(o.mem, member, i, required)

			// Optimize the child with respect to those properties.
			childCost, childOptimized := o.optimizeExpr(member.Child(i), childRequired)

			// Accumulate cost of children.
			cost += childCost

			// If any child expression is not fully optimized, then the parent
			// expression is also not fully optimized.
			if !childOptimized {
				fullyOptimized = false
			}
		}

		// Check whether this is the new lowest cost expression.
		cost += o.coster.ComputeCost(member, required)
		o.ratchetCost(state, member, cost)
	}

	// If it is the filter layer of the time series scenario,
	// we need to consider adjusting the order of filtering conditions
	// to speed up the execution of queries.
	if opt.CheckOptMode(opt.TSQueryOptMode.Get(&o.evalCtx.Settings.SV), opt.FilterOptOrder) &&
		o.mem.TSSupportAllProcessor() &&
		o.mem.CheckFlag(opt.IncludeTSTable) {
		if selectExpr, ok := member.(*memo.SelectExpr); ok {
			o.mem.SortFilters(selectExpr, member.Relational())
		}
	}
	return fullyOptimized
}

// optimizeScalarExpr recursively optimizes the children of a scalar expression.
// This is only necessary when the scalar expression contains a subquery, since
// scalar expressions otherwise always have zero cost and only one possible
// plan.
func (o *Optimizer) optimizeScalarExpr(
	scalar opt.ScalarExpr,
) (cost memo.Cost, fullyOptimized bool) {
	fullyOptimized = true
	for i, n := 0, scalar.ChildCount(); i < n; i++ {
		childProps := BuildChildPhysicalPropsScalar(o.mem, scalar, i)
		childCost, childOptimized := o.optimizeExpr(scalar.Child(i), childProps)

		// Accumulate cost of children.
		cost += childCost

		// If any child expression is not fully optimized, then the parent
		// expression is also not fully optimized.
		if !childOptimized {
			fullyOptimized = false
		}
	}
	return cost, fullyOptimized
}

// enforceProps costs an expression where one of the physical properties has
// been provided by an enforcer rather than by the expression itself. There are
// two reasons why this is necessary/desirable:
//
//  1. The expression may not be able to provide the property on its own. For
//     example, a hash join cannot provide ordered results.
//  2. The enforcer might be able to provide the property at lower overall
//     cost. For example, an enforced sort on top of a hash join might be
//     lower cost than a merge join that is already sorted, but at the cost of
//     requiring one of its children to be sorted.
//
// Note that enforceProps will recursively optimize this same group, but with
// one less required physical property. The recursive call will eventually make
// its way back here, at which point another physical property will be stripped
// off, and so on. Afterwards, the group will have computed a lowest cost
// expression for each sublist of physical properties, from all down to none.
//
// Right now, the only physical property that can be provided by an enforcer is
// physical.Required.Ordering. When adding another enforceable property, also
// update shouldExplore, which should return true if enforceProps will explore
// the group by recursively calling optimizeGroup (by way of optimizeEnforcer).
func (o *Optimizer) enforceProps(
	state *groupState, member memo.RelExpr, required *physical.Required,
) (fullyOptimized bool) {
	// Strip off one property that can be enforced. Other properties will be
	// stripped by recursively optimizing the group with successively fewer
	// properties. The properties are stripped off in a heuristic order, from
	// least likely to be expensive to enforce to most likely.
	if !required.Ordering.Any() {
		// Try Sort enforcer that requires no ordering from its input.
		enforcer := &memo.SortExpr{Input: member}
		memberProps := BuildChildPhysicalProps(o.mem, enforcer, 0, required)
		fullyOptimized = o.optimizeEnforcer(state, enforcer, required, member, memberProps)

		// Try Sort enforcer that requires a partial ordering from its input.
		longestCommonPrefix := deriveInterestingOrderingPrefix(member, required.Ordering)
		if len(longestCommonPrefix) > 0 && len(longestCommonPrefix) < len(required.Ordering.Columns) {
			enforcer := &memo.SortExpr{Input: state.best}
			enforcer.InputOrdering.FromOrdering(longestCommonPrefix)
			memberProps := BuildChildPhysicalProps(o.mem, enforcer, 0, required)
			if o.optimizeEnforcer(state, enforcer, required, member, memberProps) {
				fullyOptimized = true
			}
		}

		return fullyOptimized
	}

	return true
}

// deriveInterestingOrderingPrefix finds the longest prefix of the required ordering
// that is "interesting" as defined in Relational.Rule.InterestingOrderings.
func deriveInterestingOrderingPrefix(
	member memo.RelExpr, requiredOrdering physical.OrderingChoice,
) opt.Ordering {
	// Find the interesting orderings of the member expression.
	interestingOrderings := DeriveInterestingOrderings(member)

	// Find the longest interesting ordering that is a prefix of the required ordering.
	var longestCommonPrefix opt.Ordering
	for _, ordering := range interestingOrderings {
		var commonPrefix opt.Ordering
		for i, orderingCol := range ordering {
			if i < len(requiredOrdering.Columns) &&
				requiredOrdering.Columns[i].Group.Contains(orderingCol.ID()) &&
				requiredOrdering.Columns[i].Descending == orderingCol.Descending() {
				commonPrefix = append(commonPrefix, orderingCol)
			} else {
				break
			}
		}
		if len(commonPrefix) > len(longestCommonPrefix) {
			longestCommonPrefix = commonPrefix
		}
	}
	return longestCommonPrefix
}

// optimizeEnforcer optimizes and costs the enforcer.
func (o *Optimizer) optimizeEnforcer(
	state *groupState,
	enforcer memo.RelExpr,
	enforcerProps *physical.Required,
	member memo.RelExpr,
	memberProps *physical.Required,
) (fullyOptimized bool) {
	// Recursively optimize the member group with respect to a subset of the
	// enforcer properties.
	innerState := o.optimizeGroup(member, memberProps)
	fullyOptimized = innerState.fullyOptimized

	// Check whether this is the new lowest cost expression with the enforcer
	// added.
	cost := innerState.cost + o.coster.ComputeCost(enforcer, enforcerProps)
	if enforcerProps.MustAddSort {
		cost = 0
	}
	o.ratchetCost(state, enforcer, cost)

	// Enforcer expression is fully optimized if its input expression is fully
	// optimized.
	return fullyOptimized
}

// shouldExplore ensures that exploration is only triggered for optimizeGroup
// calls that will not recurse via a call from enforceProps.
func (o *Optimizer) shouldExplore(required *physical.Required) bool {
	return required.Ordering.Any()
}

// setLowestCostTree traverses the memo and recursively updates child pointers
// so that they point to the lowest cost expression tree rather than to the
// normalized expression tree. Each participating memo group is updated to store
// the physical properties required of it, as well as the estimated cost of
// executing the lowest cost expression in the group. As an example, say this is
// the memo, with a normalized tree containing the first expression in each of
// the groups:
//
//	memo
//	 ├── G1: (inner-join G2 G3 G4) (inner-join G3 G2 G4)
//	 ├── G2: (scan a)
//	 ├── G3: (select G5 G6) (scan b,constrained)
//	 ├── G4: (true)
//	 ├── G5: (scan b)
//	 ├── G6: (eq G7 G8)
//	 ├── G7: (variable b.x)
//	 └── G8: (const 1)
//
// setLowestCostTree is called after exploration is complete, and after each
// group member has been costed. If the second expression in groups G1 and G3
// have lower cost than the first expressions, then setLowestCostTree will
// update pointers so that the root expression tree includes those expressions
// instead.
//
// Note that a memo group can have multiple lowest cost expressions, each for a
// different set of physical properties. During optimization, these are retained
// in the groupState map. However, only one of those lowest cost expressions
// will be used in the final tree; the others are simply discarded. This is
// because there is never a case where a relational expression is referenced
// multiple times in the final tree, but with different physical properties
// required by each of those references.
func (o *Optimizer) setLowestCostTree(parent opt.Expr, parentProps *physical.Required) opt.Expr {
	var relParent memo.RelExpr
	var relCost memo.Cost
	var newProps *physical.Required
	switch t := parent.(type) {
	case memo.RelExpr:
		newProps = o.buildSortRequired(t, parentProps)
		state := o.lookupOptState(t.FirstExpr(), newProps)
		relParent, relCost = state.best, state.cost
		parent = relParent

	case memo.ScalarPropsExpr:
		// Short-circuit traversal of scalar expressions with no nested subquery,
		// since there's only one possible tree.
		if !t.ScalarProps().HasSubquery {
			return parent
		}
	}

	// Iterate over the expression's children, replacing any that have a lower
	// cost alternative.
	var mutable opt.MutableExpr
	var childProps *physical.Required
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		before := parent.Child(i)

		if relParent != nil {
			childProps = BuildChildPhysicalProps(o.mem, relParent, i, newProps)
		} else {
			childProps = BuildChildPhysicalPropsScalar(o.mem, parent, i)
		}

		after := o.setLowestCostTree(before, childProps)
		if after != before {
			if mutable == nil {
				mutable = parent.(opt.MutableExpr)
			}
			mutable.SetChild(i, after)
		}
	}

	if parent.Op() == opt.TSScanOp {
		// handle tsScan's children tagFilters and primaryTagFilters, set lowest cost
		tsScan, _ := parent.(*memo.TSScanExpr)
		for _, before := range tsScan.Flags.TagFilter {
			o.setLowestCostTree(before, childProps)
		}
		for _, before := range tsScan.Flags.PrimaryTagFilter {
			o.setLowestCostTree(before, childProps)
		}
	}

	if relParent != nil {
		var provided physical.Provided
		// BuildProvided relies on ProvidedPhysical() being set in the children, so
		// it must run after the recursive calls on the children.
		provided.Ordering = ordering.BuildProvided(relParent, &newProps.Ordering)
		o.mem.SetBestProps(relParent, newProps, &provided, relCost)
	}

	return parent
}

// ratchetCost computes the cost of the candidate expression, and then checks
// whether it's lower than the cost of the existing best expression in the
// group. If so, then the candidate becomes the new lowest cost expression.
func (o *Optimizer) ratchetCost(state *groupState, candidate memo.RelExpr, cost memo.Cost) {
	if state.best == nil || cost.Less(state.cost) {
		state.best = candidate
		state.cost = cost
	}
}

// lookupOptState looks up the state associated with the given group and
// properties. If no state exists yet, then lookupOptState returns nil.
func (o *Optimizer) lookupOptState(grp memo.RelExpr, required *physical.Required) *groupState {
	return o.stateMap[groupStateKey{group: grp, required: required}]
}

// ensureOptState looks up the state associated with the given group and
// properties. If none is associated yet, then ensureOptState allocates new
// state and returns it.
func (o *Optimizer) ensureOptState(grp memo.RelExpr, required *physical.Required) *groupState {
	key := groupStateKey{group: grp, required: required}
	state, ok := o.stateMap[key]
	if !ok {
		state = o.stateAlloc.allocate()
		state.required = required
		o.stateMap[key] = state
	}
	return state
}

// if expr contains tsInsertSelectExpr, deal with expr.input
func dealWithTsInsertSelect(expr memo.RelExpr) (memo.RelExpr, bool) {
	if e, ok := expr.(*memo.TSInsertSelectExpr); ok {
		return e.Input, true
	}
	// with ... as ...
	if e1, ok1 := expr.(*memo.WithExpr); ok1 {
		if e2, ok2 := e1.Main.(*memo.TSInsertSelectExpr); ok2 {
			return e2.Input, true
		}
	}
	return expr, false
}

// optimizeRootWithProps tries to simplify the root operator based on the
// properties required of it. This may trigger the creation of a new root and
// new properties.
func (o *Optimizer) optimizeRootWithProps() {
	root, ok := o.mem.RootExpr().(memo.RelExpr)
	if !ok {
		panic(errors.AssertionFailedf("Optimize can only be called on relational root expressions"))
	}

	// rootProps is bestExpr of subexpression in time series insert select,
	// handle TSInsertSelectExpr. execute some sql maybe stack error
	// so need to simplify subexpression
	// eg: insert into test_ts.ts_table2
	// select * from test_sts.stable as t1 where
	//	e1 in (select e1 from test_ts.ts_table where e1>1000 and t1.e2 < 2000 limit 1 offset 1);
	var isTsInsSel bool
	root, isTsInsSel = dealWithTsInsertSelect(root)
	rootProps := o.mem.RootProps()

	// [SimplifyRootOrdering]
	// SimplifyRootOrdering removes redundant columns from the root properties,
	// based on the operator's functional dependencies.
	// ps: simplifyRootOrdering is forbiddened if it is tsInsertSelect
	if rootProps.Ordering.CanSimplify(&root.Relational().FuncDeps) && !isTsInsSel {
		if o.matchedRule == nil || o.matchedRule(opt.SimplifyRootOrdering) {
			simplified := *rootProps
			simplified.Ordering = rootProps.Ordering.Copy()
			simplified.Ordering.Simplify(&root.Relational().FuncDeps)
			o.mem.SetRoot(root, &simplified)
			rootProps = o.mem.RootProps()
			if o.appliedRule != nil {
				o.appliedRule(opt.SimplifyRootOrdering, nil, root)
			}
		}
	}

	// [PruneRootCols]
	// PruneRootCols discards columns that are not needed by the root's ordering
	// or presentation properties.
	neededCols := rootProps.ColSet()
	if !neededCols.SubsetOf(root.Relational().OutputCols) {
		panic(errors.AssertionFailedf(
			"columns required of root %s must be subset of output columns %s",
			neededCols,
			root.Relational().OutputCols,
		))
	}
	if o.f.CustomFuncs().CanPruneCols(root, neededCols) {
		if o.matchedRule == nil || o.matchedRule(opt.PruneRootCols) {
			root = o.f.CustomFuncs().PruneCols(root, neededCols)
			// We may have pruned a column that appears in the required ordering.
			rootCols := root.Relational().OutputCols
			if !rootProps.Ordering.SubsetOfCols(rootCols) {
				newProps := *rootProps
				newProps.Ordering = rootProps.Ordering.Copy()
				newProps.Ordering.ProjectCols(rootCols)
				o.mem.SetRoot(root, &newProps)
				//lint:ignore SA4006 set rootProps in case another rule is added below.
				rootProps = o.mem.RootProps()
			} else {
				o.mem.SetRoot(root, rootProps)
			}
			if o.appliedRule != nil {
				o.appliedRule(opt.PruneRootCols, nil, root)
			}
		}
	}
}

// we need to rebuild the required if the SortExpr is saved from fromSubquery.
func (o *Optimizer) buildSortRequired(
	grp memo.RelExpr, required *physical.Required,
) *physical.Required {
	var newRequired *physical.Required
	if s, ok := grp.(*memo.SortExpr); ok && s.InputOrdering.CheckFlag(physical.OrderFromSubquerySave) {
		newRequired = BuildChildPhysicalProps(o.mem, s, 0, required)
	} else {
		newRequired = required
	}
	return newRequired
}

// groupStateKey associates groupState with a group that is being optimized with
// respect to a set of physical properties.
type groupStateKey struct {
	group    memo.RelExpr
	required *physical.Required
}

// groupState is temporary storage that's associated with each group that's
// optimized (or same group with different sets of physical properties). The
// optimizer stores various flags and other state here that allows it to do
// quicker lookups and short-circuit already traversed parts of the expression
// tree.
type groupState struct {
	// best identifies the lowest cost expression in the memo group for a given
	// set of physical properties.
	best memo.RelExpr

	// required is the set of physical properties that must be provided by this
	// lowest cost expression. An expression that cannot provide these properties
	// cannot be the best expression, no matter how low its cost.
	required *physical.Required

	// cost is the estimated execution cost for this expression. The best
	// expression for a given group and set of physical properties is the
	// expression with the lowest cost.
	cost memo.Cost

	// fullyOptimized is set to true once the lowest cost expression has been
	// found for a memo group, with respect to the required properties. A lower
	// cost expression will never be found, no matter how many additional
	// optimization passes are made.
	fullyOptimized bool

	// fullyOptimizedExprs contains the set of ordinal positions of each member
	// expression in the group that has been fully optimized for the required
	// properties. These never need to be recosted, no matter how many additional
	// optimization passes are made.
	fullyOptimizedExprs util.FastIntSet

	// explore is used by the explorer to store intermediate state so that
	// redundant work is minimized.
	explore exploreState
}

// isMemberFullyOptimized returns true if the group member at the given ordinal
// position has been fully optimized for the required properties. The expression
// never needs to be recosted, no matter how many additional optimization passes
// are made.
func (os *groupState) isMemberFullyOptimized(ord int) bool {
	return os.fullyOptimizedExprs.Contains(ord)
}

// markMemberAsFullyOptimized marks the group member at the given ordinal
// position as fully optimized for the required properties. The expression never
// needs to be recosted, no matter how many additional optimization passes are
// made.
func (os *groupState) markMemberAsFullyOptimized(ord int) {
	if os.fullyOptimized {
		panic(errors.AssertionFailedf("best expression is already fully optimized"))
	}
	if os.isMemberFullyOptimized(ord) {
		panic(errors.AssertionFailedf("memo expression is already fully optimized for required physical properties"))
	}
	os.fullyOptimizedExprs.Add(ord)
}

// groupStateAlloc allocates pages of groupState structs. This is preferable to
// a slice of groupState structs because pointers are not invalidated when a
// resize occurs, and because there's no need to retain a stable index.
type groupStateAlloc struct {
	page []groupState
}

// allocate returns a pointer to a new, empty groupState struct. The pointer is
// stable, meaning that its location won't change as other groupState structs
// are allocated.
func (a *groupStateAlloc) allocate() *groupState {
	if len(a.page) == 0 {
		a.page = make([]groupState, 8)
	}
	state := &a.page[0]
	a.page = a.page[1:]
	return state
}

// disableRules disables rules with the given probability for testing.
func (o *Optimizer) disableRules(probability float64) {
	essentialRules := util.MakeFastIntSet(
		// Needed to prevent constraint building from failing.
		int(opt.NormalizeInConst),
		// Needed when an index is forced.
		int(opt.GenerateIndexScans),
		// Needed to prevent "same fingerprint cannot map to different groups."
		int(opt.PruneJoinLeftCols),
		int(opt.PruneJoinRightCols),
		// Needed to prevent stack overflow.
		int(opt.PushFilterIntoJoinLeftAndRight),
		int(opt.PruneSelectCols),
		// Needed to prevent execbuilder error.
		// TODO(radu): the DistinctOn execution path should be fixed up so it
		// supports distinct on an empty column set.
		int(opt.EliminateDistinctNoColumns),
		int(opt.EliminateErrorDistinctNoColumns),
	)

	for i := opt.RuleName(1); i < opt.NumRuleNames; i++ {
		if rand.Float64() < probability && !essentialRules.Contains(int(i)) {
			o.disabledRules.Add(int(i))
		}
	}

	o.NotifyOnMatchedRule(func(ruleName opt.RuleName) bool {
		if o.disabledRules.Contains(int(ruleName)) {
			log.Infof(o.evalCtx.Context, "disabled rule matched: %s", ruleName.String())
			return false
		}
		return true
	})
}

func (o *Optimizer) String() string {
	return o.FormatMemo(FmtPretty)
}

// FormatMemo returns a string representation of the memo for testing
// and debugging. The given flags control which properties are shown.
func (o *Optimizer) FormatMemo(flags FmtFlags) string {
	mf := makeMemoFormatter(o, flags)
	return mf.format()
}

// RecomputeCost recomputes the cost of each expression in the lowest cost
// tree. It should be used in combination with the perturb-cost OptTester flag
// in order to update the query plan tree after optimization is complete with
// the real computed cost, not the perturbed cost.
func (o *Optimizer) RecomputeCost() {
	var c coster
	c.Init(o.evalCtx, o.mem, 0 /* perturbation */)

	root := o.mem.RootExpr()
	rootProps := o.mem.RootProps()
	o.recomputeCostImpl(root, rootProps, &c)
}

func (o *Optimizer) recomputeCostImpl(
	parent opt.Expr, parentProps *physical.Required, c Coster,
) memo.Cost {
	cost := memo.Cost(0)
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		child := parent.Child(i)
		childProps := physical.MinRequired
		switch t := child.(type) {
		case memo.RelExpr:
			childProps = t.RequiredPhysical()
		}
		cost += o.recomputeCostImpl(child, childProps, c)
	}

	switch t := parent.(type) {
	case memo.RelExpr:
		cost += c.ComputeCost(t, parentProps)
		o.mem.ResetCost(t, cost)
	}

	return cost
}

// FormatExpr is a convenience wrapper for memo.FormatExpr.
func (o *Optimizer) FormatExpr(e opt.Expr, flags memo.ExprFmtFlags) string {
	return memo.FormatExpr(e, flags, o.mem, o.catalog)
}

// InitTSRequired init time series required optimizer
func (o *Optimizer) InitTSRequired(parent opt.Expr, parentProps *physical.Required) {
	var relParent memo.RelExpr
	var relCost memo.Cost
	switch t := parent.(type) {
	case memo.RelExpr:
		relParent = t
		parent = relParent
	}
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		before := parent.Child(i)

		childProps := &physical.Required{}
		o.InitTSRequired(before, childProps)
	}
	if relParent != nil {
		var provided physical.Provided
		o.mem.SetBestProps(relParent, parentProps, &provided, relCost)
	}
}

// checkHintApply checks whether the hint is applied(include external and embedded)
func (o *Optimizer) checkHintApply(state *groupState) {
	if !state.fullyOptimized {
		return
	}
	e := state.best
	switch t := e.(type) {
	case *memo.ScanExpr:
		if t.Flags.HintType == keys.NoScanHint {
			break
		}
		if t.Flags.HintType == keys.ForceTableScan && t.Index != cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.UseTableScan && t.Index != cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.IgnoreTableScan && t.Index == cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.ForceIndexOnly && t.Flags.HintIndexes[0] != t.Index {
			panic(pgerror.New(pgcode.ExternalApplyIndexOnlyForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.ForceIndexScan && t.Flags.HintIndexes[0] != t.Index {
			panic(pgerror.New(pgcode.ExternalApplyIndexScanForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.UseIndexOnly {
			find := false
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					find = true
					break
				}
			}
			if !find {
				panic(pgerror.New(pgcode.ExternalApplyIndexOnlyUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
			}
			break
		}
		if t.Flags.HintType == keys.UseIndexScan {
			find := false
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					find = true
					break
				}
			}
			if !find {
				panic(pgerror.New(pgcode.ExternalApplyIndexScanUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
			}
			break
		}

		if t.Flags.HintType == keys.IgnoreIndexOnly {
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					panic(pgerror.New(pgcode.ExternalApplyIndexOnlyIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
				}
			}
			break
		}
		if t.Flags.HintType == keys.IgnoreIndexScan {
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					panic(pgerror.New(pgcode.ExternalApplyIndexScanIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
				}
			}
			break
		}
	case *memo.MergeJoinExpr:
		if t.HintInfo.DisallowMergeJoin {
			panic(pgerror.New(pgcode.ExternalApplyMergeJoinDisallow, "stmt hint err: DisallowMergeJoin is true, but MergeJoinExpr was generated."))
		}

	case *memo.LookupJoinExpr:
		if t.HintInfo.DisallowLookupJoin {
			panic(pgerror.New(pgcode.ExternalApplyLookupJoinDisallow, "stmt hint err: DisallowLookupJoin is true, but LookupJoinExpr was generated."))
		}
	case *memo.InnerJoinExpr:
		if t.HintInfo.DisallowHashJoin {
			panic(pgerror.New(pgcode.ExternalApplyHashJoinDisallow, "stmt hint err: DisallowHashJoin is true, but HashJoinExpr was generated."))
		}

	default:
		break
	}
}

// walk finds the TSScanExpr.
// expr is a memo tree.
func walk(expr opt.Expr, res *int) {
	switch expr.Op() {
	case opt.AntiJoinOp, opt.AntiJoinApplyOp, opt.BatchLookUpJoinOp, opt.DistinctOnOp, opt.ExceptOp, opt.ExceptAllOp, opt.ExplainOp, opt.ExportOp,
		opt.FakeRelOp, opt.FullJoinOp, opt.GroupByOp, opt.IndexJoinOp, opt.InnerJoinOp, opt.InnerJoinApplyOp, opt.InsertOp, opt.IntersectOp, opt.IntersectAllOp,
		opt.LeftJoinOp, opt.LeftJoinApplyOp, opt.LimitOp, opt.LookupJoinOp, opt.Max1RowOp, opt.MergeJoinOp, opt.OffsetOp, opt.OrdinalityOp, opt.ProjectOp,
		opt.ProjectSetOp, opt.RecursiveCTEOp, opt.RightJoinOp, opt.ScalarGroupByOp, opt.ScanOp, opt.SelectOp, opt.SelectIntoOp, opt.SemiJoinOp, opt.SemiJoinApplyOp,
		opt.SequenceSelectOp, opt.UnionOp, opt.UnionAllOp, opt.ValuesOp, opt.VirtualScanOp, opt.WindowOp, opt.WithOp, opt.WithScanOp, opt.ZigzagJoinOp:
		// should only check relation expr.
		for i := 0; i < expr.ChildCount(); i++ {
			walk(expr.Child(i), res)
		}
	case opt.TSScanOp:
		*res |= 1
	}
}

// IndexMapping defines the mapping for indexes associated with a specific table
// for multi-model processing.
type IndexMapping struct {
	TableIndex opt.TableID
	IndexID    []uint32
}

// CheckMultiModel serves as the entry point for checking all expressions
// in the memo to ensure they comply with the requirements
// for multi-model processing.
func (o *Optimizer) CheckMultiModel() {
	mem := o.mem
	root := mem.RootExpr().(memo.RelExpr)
	processRootExpr(root, mem)
	processRemainingRelTables(mem)
}

// isTSScanOrSelectTSScan is a helper function to check if the expression is a TSScanExpr or a SelectExpr with a TSScanExpr input
// for multiple model processing
func isTSScanOrSelectTSScan(expr memo.RelExpr) bool {
	switch e := expr.(type) {
	case *memo.Max1RowExpr:
		return isTSScanOrSelectTSScan(e.Input)
	case *memo.ProjectExpr:
		return isTSScanOrSelectTSScan(e.Input)
	case *memo.TSScanExpr:
		return true
	case *memo.SelectExpr:
		if _, ok := e.Input.(*memo.TSScanExpr); ok {
			return true
		}
	}
	return false
}

// processRootExpr examines the expressions within the memo structure to determine
// if the multi-model flag needs to be reset.
// for multiple model processing
func processRootExpr(root memo.RelExpr, mem *memo.Memo) {
	allTables := mem.Metadata().AllTables()
	switch expr := root.(type) {
	case *memo.ProjectExpr:
		processRootExpr(expr.Input, mem)
		res := 0
		// only handle projection when the input of ProjectExpr has TSScanExpr.
		if walk(expr.Input, &res); res > 0 {
			for _, proj := range expr.Projections {
				if castExpr, ok := proj.Element.(*memo.CastExpr); ok {
					if variableExpr, ok := castExpr.Input.(*memo.VariableExpr); ok {
						colID := variableExpr.Col
						tableID := mem.Metadata().ColumnMeta(colID).Table
						tableGroupID := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
						if tableGroupID >= 0 {
							mem.MultimodelHelper.PreGroupInfos[tableGroupID].ProjectExpr = expr
						}
					}
				}
				if functionExpr, ok := proj.Element.(*memo.FunctionExpr); ok {
					if functionExpr.FunctionPrivate.Name == "time_bucket" {
						if len(functionExpr.Args) > 0 {
							if variableExpr, ok := functionExpr.Args[0].(*memo.VariableExpr); ok {
								colID := variableExpr.Col
								tableID := mem.Metadata().ColumnMeta(colID).Table
								tableGroupID := mem.MultimodelHelper.GetTableIndexFromGroup(tableID)
								if tableGroupID >= 0 {
									aggColID := proj.Col
									if !containsColumn(mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols, aggColID) {
										mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols = append(
											mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols, aggColID)
									}
									mem.MultimodelHelper.PreGroupInfos[tableGroupID].HasTimeBucket = true
									mem.MultimodelHelper.PreGroupInfos[tableGroupID].ProjectExpr = expr
								}
							}
						}
					}
				}
			}
		}
	case *memo.SelectExpr:
		processRootExpr(expr.Input, mem)
	case *memo.DistinctOnExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpsertDistinctOnExpr:
		processRootExpr(expr.Input, mem)
	case *memo.LimitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.OffsetExpr:
		processRootExpr(expr.Input, mem)
	case *memo.SortExpr:
		processRootExpr(expr.Input, mem)
	case *memo.IndexJoinExpr:
		processRootExpr(expr.Input, mem)
	case *memo.LookupJoinExpr:
		processRootExpr(expr.Input, mem)
	case *memo.OrdinalityExpr:
		processRootExpr(expr.Input, mem)
	case *memo.Max1RowExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ProjectSetExpr:
		processRootExpr(expr.Input, mem)
	case *memo.WindowExpr:
		processRootExpr(expr.Input, mem)
	case *memo.InsertExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpdateExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpsertExpr:
		processRootExpr(expr.Input, mem)
	case *memo.DeleteExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CreateTableExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ExplainExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableSplitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableUnsplitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableRelocateExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ControlJobsExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CancelQueriesExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CancelSessionsExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ExportExpr:
		processRootExpr(expr.Input, mem)
	case *memo.TSInsertSelectExpr:
		processRootExpr(expr.Input, mem)
	case *memo.GroupByExpr, *memo.ScalarGroupByExpr:
		processGroupByOrScalarGroupByExpr(expr, mem)
	case *memo.InnerJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan && rightIsTSScan {
			mem.QueryType = memo.Unset
		}

		joinPredicates := expr.On

		for _, jp := range joinPredicates {
			if _, ok := jp.Condition.(*memo.OrExpr); ok {
				mem.QueryType = memo.Unset
				mem.MultimodelHelper.ResetReasons[memo.UnsupportedCrossJoin] = struct{}{}
			} else if mem.IsTsColsJoinPredicate(jp) {
				mem.QueryType = memo.Unset
			}
		}

		indexMapping := make(map[opt.TableID][][]opt.ColumnID)
		for _, table := range allTables {
			tableID := table.MetaID
			for i := 0; i < table.Table.IndexCount(); i++ {
				index := table.Table.Index(i)
				if index.IsUnique() {
					uint32IDs := index.IndexColumnIDs()
					columnIDs := make([]opt.ColumnID, len(uint32IDs))
					for j, id := range uint32IDs {
						columnIDs[j] = opt.ColumnID(id)
					}
					indexMapping[tableID] = append(indexMapping[tableID], columnIDs)
				}
			}
		}

		processJoinRelations(joinPredicates, expr.Left, expr.Right, mem)

		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.SemiJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.SemiJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.AntiJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.AntiJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.LeftJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedOuterJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.UnionAllExpr:
		mem.MultimodelHelper.IsUnion = true
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.UnionExpr:
		mem.MultimodelHelper.IsUnion = true
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.WithExpr:
		processRootExpr(expr.Binding, mem)
		processRootExpr(expr.Main, mem)
	default:
	}
}

// processRemainingRelTables processes the remaining tables in the memo structure
// and adds them to the table group
func processRemainingRelTables(mem *memo.Memo) {
	metadata := mem.Metadata()
	if len(mem.MultimodelHelper.TableGroup) == 0 {
		var component []opt.TableID
		for _, table := range metadata.AllTables() {
			if metadata.TableMeta(table.MetaID).Table.GetTableType() != tree.TimeseriesTable {
				component = append(component, table.MetaID)
			} else {
				mem.MultimodelHelper.TableGroup = append(mem.MultimodelHelper.TableGroup, []opt.TableID{table.MetaID})
				mem.MultimodelHelper.PreGroupInfos = append(mem.MultimodelHelper.PreGroupInfos, memo.PreGroupInfo{})
				mem.MultimodelHelper.AggNotPushDown = append(mem.MultimodelHelper.AggNotPushDown, false)
				mem.MultimodelHelper.PlanMode = append(mem.MultimodelHelper.PlanMode, memo.Undefined)
			}
		}
		mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], component...)
	} else {
		for _, table := range metadata.AllTables() {
			tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(table.MetaID)
			if tgIdx == -1 && metadata.TableMeta(table.MetaID).Table.GetTableType() != tree.TimeseriesTable {
				mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], table.MetaID)
			}
		}
	}
}

func findTSTableID(expr memo.RelExpr, mem *memo.Memo) opt.TableID {
	for i := 0; i < expr.ChildCount(); i++ {
		if v, ok := expr.Child(i).(memo.RelExpr); ok {
			if tsScanExpr, ok := v.(*memo.TSScanExpr); ok {
				return tsScanExpr.TSScanPrivate.Table
			}
			if tableID := findTSTableID(v, mem); tableID != 0 {
				return tableID
			}
		}
	}
	return 0
}

// getColIDOfExpr returns the col ID of aggregation.
func getColIDOfExpr(input opt.Expr, pIdx int, origColID opt.ColumnID) opt.ColumnID {
	switch src := (input).(type) {
	case *memo.VariableExpr:
		return src.Col
	case *memo.AggDistinctExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.CountExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.ProjectExpr:
		if origColID != -1 && len(src.Projections) > pIdx &&
			src.Projections[pIdx].Col == origColID {
			return getColIDOfExpr(src.Projections[pIdx].Element, 0, origColID)
		}
	case *memo.CastExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.CaseExpr:
		return getColIDOfExpr(src.Whens[0], 0, origColID)
	case *memo.WhenExpr:
		return getColIDOfExpr(src.Condition, 0, origColID)
	case *memo.GtExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.LtExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.GeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.LeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.EqExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.NeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.DivExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.AndExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.OrExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.MultExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.PlusExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.MinusExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.FunctionExpr:
		for _, arg := range src.Args {
			colID := getColIDOfExpr(arg, 0, origColID)
			if colID != -1 {
				return colID
			}
		}
	default:
	}
	return -1
}

func getColIDsOfExpr(input opt.Expr, pIdx int, origColID opt.ColumnID) []opt.ColumnID {
	var colIDs []opt.ColumnID

	switch src := input.(type) {
	case *memo.VariableExpr:
		colIDs = append(colIDs, src.Col)

	case *memo.AggDistinctExpr, *memo.CountExpr, *memo.CastExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(0), 0, origColID)...)

	case *memo.ProjectExpr:
		if origColID != -1 && len(src.Projections) > pIdx &&
			src.Projections[pIdx].Col == origColID {
			colIDs = append(colIDs, getColIDsOfExpr(src.Projections[pIdx].Element, 0, origColID)...)
		}

	case *memo.CaseExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Whens[0], 0, origColID)...)

	case *memo.WhenExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Condition, 0, origColID)...)

	case *memo.GtExpr, *memo.LtExpr, *memo.GeExpr, *memo.LeExpr, *memo.EqExpr, *memo.PlusExpr, *memo.MinusExpr,
		*memo.NeExpr, *memo.DivExpr, *memo.AndExpr, *memo.OrExpr, *memo.MultExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(0), 0, origColID)...)
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(1), 0, origColID)...)

	case *memo.FunctionExpr:
		for _, arg := range src.Args {
			colIDs = append(colIDs, getColIDsOfExpr(arg, 0, origColID)...)
		}
	}

	return removeDuplicateColIDs(colIDs)
}

func removeDuplicateColIDs(colIDs []opt.ColumnID) []opt.ColumnID {
	colIDMap := make(map[opt.ColumnID]struct{})
	var uniqueColIDs []opt.ColumnID

	for _, colID := range colIDs {
		if colID != -1 {
			if _, exists := colIDMap[colID]; !exists {
				colIDMap[colID] = struct{}{}
				uniqueColIDs = append(uniqueColIDs, colID)
			}
		}
	}
	return uniqueColIDs
}

// processGroupByOrScalarGroupByExpr analyzes a GroupByExpr or ScalarGroupByExpr
// to determine whether multi-model optimization is supported, whether aggregation
// can be pushed down to the TS engine, and to record pre-grouping information.
func processGroupByOrScalarGroupByExpr(expr memo.RelExpr, mem *memo.Memo) {
	processRootExpr(expr.Child(0).(memo.RelExpr), mem)
	processRemainingRelTables(mem)
	unsupportedMultiModel := false
	metaData := mem.Metadata()
	if projectExpr, ok := expr.Child(0).(*memo.ProjectExpr); ok {
		for _, proj := range projectExpr.Projections {
			if element, ok := proj.Element.(opt.Expr); ok {
				if execInTSEngine, _ := memo.CheckExprCanExecInTSEngine(element, memo.ExprPosProjList,
					mem.GetWhiteList().CheckWhiteListParam); !execInTSEngine {
					// this path used to fallback because of memo.UnsupportedAggFuncOrExpr in outside-in plan,
					// but now outside-in supports this path and leave unsupported agg/expr to relational side
					colIDs := getColIDsOfExpr(element, 0, -1)
					tableIDs := make(map[opt.TableID]struct{})

					// if we can't find the colID or table group id, we can't determine the plan mode
					hasInvalidColID := false
					hasInvalidTgIdx := false

					for _, colID := range colIDs {
						if colID == -1 {
							hasInvalidColID = true
						}
						if colID != -1 {
							tableID := metaData.ColumnMeta(colID).Table
							tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
							tableIDs[tableID] = struct{}{}
							if tgIdx == -1 {
								hasInvalidTgIdx = true
							}
						}
					}

					if hasInvalidColID || hasInvalidTgIdx {
						// if there is only one table group, we can still safely set its flag
						if len(mem.MultimodelHelper.TableGroup) == 1 {
							mem.MultimodelHelper.AggNotPushDown[0] = true
						} else {
							// otherwise, we need to set the flag to unsupported and fallback
							unsupportedMultiModel = true
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedAggFuncOrExpr] = struct{}{}
						}
					} else {
						// only if the table is a timeseries table, we can't push down the aggregation,
						// otherwise we can consider both inside-out and outside-in
						for tableID := range tableIDs {
							if metaData.TableMeta(tableID).Table.GetTableType() == tree.TimeseriesTable {
								tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
								if tgIdx != -1 {
									mem.MultimodelHelper.AggNotPushDown[tgIdx] = true
								}
							}
						}
					}
				}
			}
		}
	}

	// check data types of grouping cols
	groupingCols := expr.Private().(*memo.GroupingPrivate).GroupingCols
	groupingCols.ForEach(func(col opt.ColumnID) {
		if !memo.CheckDataType(metaData.ColumnMeta(col).Type) ||
			!memo.CheckDataLength(metaData.ColumnMeta(col).Type) {
			tableID := metaData.ColumnMeta(col).Table
			if tableID == 0 {
				if projectExpr, ok := expr.Child(0).(*memo.ProjectExpr); ok {
					for _, proj := range projectExpr.Projections {
						if proj.Col == col {
							if element, ok := proj.Element.(opt.Expr); ok {
								colID := getColIDOfExpr(element, 0, col)
								if colID != -1 {
									tableID = metaData.ColumnMeta(colID).Table
								}
							}
						}
					}
				}
			}
			tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
			if tgIdx >= 0 && !mem.MultimodelHelper.AggNotPushDown[tgIdx] {
				if mem.MultimodelHelper.PlanMode[tgIdx] == memo.OutsideIn {
					mem.QueryType = memo.Unset
					mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
				} else {
					mem.MultimodelHelper.PlanMode[tgIdx] = memo.InsideOut
				}
			} else {
				unsupportedMultiModel = true
				mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
			}
		}
	})

	handleGroupByOrdering := false
	hasCountRow := false
	countRowTableIdx := -1
	skipProj := 0

	var avgAggs []struct {
		FuncID opt.ColumnID
		ColID  opt.ColumnID
	}
	countAggs := make(map[opt.ColumnID]opt.ColumnID)
	sumAggs := make(map[opt.ColumnID]opt.ColumnID)

	// check data types of aggregation columns
	aggregations := *expr.Child(1).(*memo.AggregationsExpr)
	for pIdx, agg := range aggregations {
		var aggColID opt.ColumnID
		var aggColTableID opt.TableID

		aggFuncID := agg.Col
		noPreGrouping := false
		// skip the pregrouping analysis for count(*) expression
		if agg.Agg.ChildCount() == 0 {
			if _, ok := agg.Agg.(*memo.CountRowsExpr); ok {
				aggColTableID = findTSTableID(expr, mem)
				hasCountRow = true
			}
		} else {
			aggColID = getColIDOfExpr(agg.Agg.Child(0), 0, -1)
			aggColTableID = mem.Metadata().ColumnMeta(aggColID).Table
		}

		if aggColTableID == 0 {
			noPreGrouping = true
			aggColID = getColIDOfExpr(expr.Child(0), pIdx-skipProj, aggColID)
			if aggColID != -1 {
				aggColTableID = mem.Metadata().ColumnMeta(aggColID).Table
			} else {
				skipProj++
			}
		} else {
			skipProj++
		}
		aggType := reflect.TypeOf(agg.Agg).Elem().Name()
		preGroupInfos := mem.MultimodelHelper.PreGroupInfos
		if len(preGroupInfos) < len(mem.MultimodelHelper.TableGroup) {
			for len(preGroupInfos) < len(mem.MultimodelHelper.TableGroup) {
				preGroupInfos = append(preGroupInfos, memo.PreGroupInfo{})
			}
		}

		groupIndex := mem.MultimodelHelper.GetTSTableIndexFromGroup(aggColTableID)
		if hasCountRow && countRowTableIdx == -1 {
			countRowTableIdx = groupIndex
		}

		if noPreGrouping {
			if aggColTableID == 0 {
				continue
			}
			if metaData.TableMeta(aggColTableID).Table.GetTableType() == tree.TimeseriesTable {
				mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
				continue
			}
		}

		for i := 0; i < agg.Child(0).ChildCount(); i++ {
			if scalarExpr, ok := agg.Child(0).Child(i).(opt.ScalarExpr); ok {
				if !memo.CheckDataType(scalarExpr.DataType()) {
					if groupIndex >= 0 && !mem.MultimodelHelper.AggNotPushDown[groupIndex] {
						if mem.MultimodelHelper.PlanMode[groupIndex] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[groupIndex] = memo.InsideOut
						}
					} else {
						unsupportedMultiModel = true
						mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
					}
				}
			}
		}

		if metaData.TableMeta(aggColTableID).Table.GetTableType() != tree.TimeseriesTable {
			continue
		}

		if groupIndex != -1 {
			preGroupInfos[groupIndex].AggColumns = append(preGroupInfos[groupIndex].AggColumns, aggColID)
			preGroupInfos[groupIndex].AggFuncs = append(preGroupInfos[groupIndex].AggFuncs, aggType)
			switch aggType {
			case "CountExpr", "CountRowsExpr":
				// preGroupInfos[groupIndex].NewAggColumns = append(preGroupInfos[groupIndex].NewAggColumns, aggFuncID)
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				countAggs[aggColID] = aggFuncID
			case "AvgExpr":
				preGroupInfos[groupIndex].AvgFuncColumns = append(preGroupInfos[groupIndex].AvgFuncColumns, aggFuncID)
				avgAggs = append(avgAggs, struct {
					FuncID opt.ColumnID
					ColID  opt.ColumnID
				}{
					FuncID: aggFuncID,
					ColID:  aggColID,
				})
			case "SumExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				sumAggs[aggColID] = aggFuncID
			case "MaxExpr", "MinExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
			case "LastExpr", "FirstExpr", "LastRowExpr", "FirstRowExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				mem.MultimodelHelper.HasLastAgg = true
				if mem.MultimodelHelper.PlanMode[groupIndex] == memo.InsideOut {
					mem.QueryType = memo.Unset
					mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
				} else {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.OutsideIn
				}
			case "AggDistinctExpr":
				if mem.MultimodelHelper.PlanMode[groupIndex] != memo.InsideOut {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.OutsideIn
				} else {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.Undefined
					mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
				}
			default:
				mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
			}
			preGroupInfos[groupIndex].GroupByExprPos = append(preGroupInfos[groupIndex].GroupByExprPos, pIdx)
			preGroupInfos[groupIndex].BuildingPreGrouping = true

			var mappings1 []memo.AvgToCountOrSumMapping
			for _, avg := range avgAggs {
				countFuncID := opt.ColumnID(0)
				if matchedFuncID, exists := countAggs[avg.ColID]; exists {
					countFuncID = matchedFuncID
				}

				mappings1 = append(mappings1, memo.AvgToCountOrSumMapping{
					AvgFuncID: avg.FuncID,
					FuncID:    countFuncID,
				})
			}
			preGroupInfos[groupIndex].AvgToCountMapping = mappings1

			var mappings2 []memo.AvgToCountOrSumMapping
			for _, avg := range avgAggs {
				sumFuncID := opt.ColumnID(0)
				if matchedFuncID, exists := sumAggs[avg.ColID]; exists {
					sumFuncID = matchedFuncID
				}

				mappings2 = append(mappings2, memo.AvgToCountOrSumMapping{
					AvgFuncID: avg.FuncID,
					FuncID:    sumFuncID,
				})
			}
			preGroupInfos[groupIndex].AvgToSumMapping = mappings2

			if !handleGroupByOrdering {
				groupByInput := expr.Child(0).(memo.RelExpr)
				for _, c := range groupByInput.ProvidedPhysical().Ordering {
					found := false
					for _, col := range preGroupInfos[groupIndex].GroupingCols {
						if col == c.ID() {
							found = true
							break
						}
					}
					if !found {
						if !containsColumn(preGroupInfos[groupIndex].GroupingCols, aggColID) {
							preGroupInfos[groupIndex].GroupingCols = append(preGroupInfos[groupIndex].GroupingCols, c.ID())
						}
					}
				}
				handleGroupByOrdering = true
			}
		}

		mem.MultimodelHelper.PreGroupInfos = preGroupInfos

		srcExpr := agg.Agg
		hashCode := memo.GetExprHash(srcExpr)
		if groupIndex >= 0 && !mem.GetWhiteList().CheckWhiteListParam(hashCode, memo.ExprPosProjList) {
			mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
		}
	}

	// if the agg in the multi-model query is countrows only so far, then don't choose inside-out plan
	// to guarantee the correct result as it is under join context
	if hasCountRow && countRowTableIdx != -1 && len(mem.MultimodelHelper.PreGroupInfos[countRowTableIdx].AggFuncs) == 1 {
		if mem.MultimodelHelper.PlanMode[countRowTableIdx] == memo.InsideOut {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
		} else {
			mem.MultimodelHelper.PlanMode[countRowTableIdx] = memo.OutsideIn
		}
	}

	// handle reduced grouping columns
	groupingCols.ForEach(func(col opt.ColumnID) {
		tgIdx := mem.MultimodelHelper.GetTableIndexFromGroup(metaData.ColumnMeta(col).Table)
		if tgIdx >= 0 {
			if !containsColumn(mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols, col) {
				mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols = append(mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols, col)
			}
		}
	})

	if unsupportedMultiModel {
		mem.QueryType = memo.Unset
	}
}

// processJoinRelations constructs TableGroup, a two-dimensional array that
// organizes time-series and relational tables based on their join relationships.
//
//   - Each subarray in TableGroup represents a group of related tables.
//   - The first element in each subarray is a timeseries table's TableID.
//   - The subsequent elements are relational tables that have a direct join
//     relationship with the timeseries table.
//   - If a table does not directly join a timeseries table but joins another
//     relational table, it is added to the same subarray as that relational table.
//
// **Limitations:**
//   - Cross joins are not supported, and if no no valid tables are found to
//     construct TableGroup, the plan will fall back
//   - Only equality conditions (EqExpr) are considered for table grouping;
//   - TableIDs must be resolvable from ColumnIDs; virtual or derived columns
//     without a clear TableID will not be included in TableGroup.
func processJoinRelations(
	joinPredicates memo.FiltersExpr, left memo.RelExpr, right memo.RelExpr, mem *memo.Memo,
) {
	tableGroup := mem.MultimodelHelper.TableGroup
	pendingRelations := make(map[opt.TableID][]opt.TableID)

	if joinPredicates.ChildCount() == 0 {
		if len(mem.MultimodelHelper.TableGroup) == 0 {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedCrossJoin] = struct{}{}
		} else {
			switch lc := left.(type) {
			case *memo.ScanExpr:
				if mem.MultimodelHelper.GetTSTableIndexFromGroup(lc.ScanPrivate.Table) == -1 {
					mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], lc.ScanPrivate.Table)
				}
			default:
			}
			switch rc := right.(type) {
			case *memo.ScanExpr:
				if mem.MultimodelHelper.GetTSTableIndexFromGroup(rc.ScanPrivate.Table) == -1 {
					mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], rc.ScanPrivate.Table)
				}
			default:
			}
		}
		return
	}

	for _, jp := range joinPredicates {
		if eqExpr, ok := jp.Condition.(*memo.EqExpr); ok {
			var leftTableID, rightTableID opt.TableID
			var leftColID, rightColID opt.ColumnID
			var leftIsTimeSeries, rightIsTimeSeries bool

			if leftVar, ok := eqExpr.Left.(*memo.VariableExpr); ok {
				leftColID = leftVar.Col
				leftTableID = mem.Metadata().ColumnMeta(leftColID).Table

				// handle virtual column
				if leftTableID == 0 {
					leftColID = getColIDOfExpr(left, 0, leftColID)
					if leftColID >= 0 {
						leftTableID = mem.Metadata().ColumnMeta(leftColID).Table
					}
				}
				if leftTableID != 0 {
					leftTable := mem.Metadata().Table(leftTableID)
					leftIsTimeSeries = leftTable.GetTableType() == tree.TimeseriesTable
				}
			}

			if rightVar, ok := eqExpr.Right.(*memo.VariableExpr); ok {
				rightColID = rightVar.Col
				rightTableID = mem.Metadata().ColumnMeta(rightColID).Table

				// handle virtual column
				if rightTableID == 0 {
					rightColID = getColIDOfExpr(right, 0, rightColID)
					if rightColID >= 0 {
						rightTableID = mem.Metadata().ColumnMeta(rightColID).Table
					}
				}
				if rightTableID != 0 {
					rightTable := mem.Metadata().Table(rightTableID)
					rightIsTimeSeries = rightTable.GetTableType() == tree.TimeseriesTable
				}
			}

			if leftIsTimeSeries || rightIsTimeSeries {
				tsTableID := leftTableID
				relTableID := rightTableID
				groupingColID := leftColID
				if !leftIsTimeSeries {
					tsTableID, relTableID = rightTableID, leftTableID
					groupingColID = rightColID
				}

				found := false
				var index int
				for i, component := range tableGroup {
					if component[0] == tsTableID {
						if !contains(component, relTableID) && relTableID != 0 {
							tableGroup[i] = append(component, relTableID)
						}
						index = i
						found = true
						break
					}
				}
				if !found && tsTableID != 0 {
					index = len(tableGroup)
					if relTableID != 0 {
						tableGroup = append(tableGroup, []opt.TableID{tsTableID, relTableID})
					} else {
						tableGroup = append(tableGroup, []opt.TableID{tsTableID})
					}
				}

				if leftIsTimeSeries {
					if rightColID != -1 && (!memo.CheckDataType(mem.Metadata().ColumnMeta(rightColID).Type) ||
						!memo.CheckDataLength(mem.Metadata().ColumnMeta(rightColID).Type)) {
						if mem.MultimodelHelper.PlanMode[index] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[index] = memo.InsideOut
						}
					}
				} else {
					if leftColID != -1 && (!memo.CheckDataType(mem.Metadata().ColumnMeta(leftColID).Type) ||
						!memo.CheckDataLength(mem.Metadata().ColumnMeta(leftColID).Type)) {
						if mem.MultimodelHelper.PlanMode[index] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[index] = memo.InsideOut
						}
					}
				}

				for len(mem.MultimodelHelper.PreGroupInfos) <= index {
					mem.MultimodelHelper.PreGroupInfos = append(mem.MultimodelHelper.PreGroupInfos, memo.PreGroupInfo{})
					mem.MultimodelHelper.AggNotPushDown = append(mem.MultimodelHelper.AggNotPushDown, false)
					mem.MultimodelHelper.PlanMode = append(mem.MultimodelHelper.PlanMode, memo.Undefined)
				}
				if !containsColumn(mem.MultimodelHelper.PreGroupInfos[index].GroupingCols, groupingColID) {
					mem.MultimodelHelper.PreGroupInfos[index].GroupingCols = append(mem.MultimodelHelper.PreGroupInfos[index].GroupingCols, groupingColID)
				}
			} else {
				pendingRelations[leftTableID] = append(pendingRelations[leftTableID], rightTableID)
				pendingRelations[rightTableID] = append(pendingRelations[rightTableID], leftTableID)
			}
		}
	}

	if len(pendingRelations) > 0 {
		processed := make(map[opt.TableID]bool)

		for tableID, connectedTables := range pendingRelations {
			alreadyAssigned := false
			for _, component := range tableGroup {
				if contains(component, tableID) {
					alreadyAssigned = true
					break
				}
			}
			if alreadyAssigned {
				processed[tableID] = true
				continue
			}

			assigned := false
			for i, component := range tableGroup {
				for _, tID := range component {
					if contains(connectedTables, tID) {
						tableGroup[i] = append(component, tableID)
						assigned = true
						break
					}
				}
				if assigned {
					break
				}
			}

			if assigned {
				processed[tableID] = true
			}
		}
	}

	mem.MultimodelHelper.TableGroup = tableGroup
}

func contains(list []opt.TableID, target opt.TableID) bool {
	for _, item := range list {
		if item == target {
			return true
		}
	}
	return false
}

func containsColumn(list []opt.ColumnID, target opt.ColumnID) bool {
	for _, item := range list {
		if item == target {
			return true
		}
	}
	return false
}

func checkIfJoinColsContainUniqueIndex(
	tableID opt.TableID,
	joinColIDs []opt.ColumnID,
	indexMapping map[opt.TableID][][]opt.ColumnID,
	mem *memo.Memo,
) bool {
	uniqueIndexes, exists := indexMapping[tableID]
	if !exists {
		return false
	}

	joinColSet := make(map[opt.ColumnID]bool)
	for _, col := range joinColIDs {
		adjustCol := mem.Metadata().GetTagIDByColumnID(col) + 1
		joinColSet[opt.ColumnID(adjustCol)] = true
	}

	for _, uniqueIndexCols := range uniqueIndexes {
		if isSubset(uniqueIndexCols, joinColSet) {
			return true
		}
	}

	return false
}

func isSubset(indexCols []opt.ColumnID, joinColSet map[opt.ColumnID]bool) bool {
	for _, col := range indexCols {
		if !joinColSet[col] {
			return false
		}
	}
	return true
}

// JoinOrderBuilder returns the JoinOrderBuilder instance that the optimizer is
// currently using to reorder join trees.
func (o *Optimizer) JoinOrderBuilder() *JoinOrderBuilder {
	return o.jb
}
